home *** CD-ROM | disk | FTP | other *** search
- {*********************************************************}
- {* AAThdCpy *}
- {* Copyright (c) Julian M Bucknall 1998-2000 *}
- {* All rights reserved. *}
- {*********************************************************}
- {* Algorithms Alfresco: multithreaded multibuffered copy *}
- {*********************************************************}
-
- {Note: this unit is released as freeware. In other words, you are free
- to use this unit in your own applications, however I retain all
- copyright to the code. JMB}
-
- unit AAThdCpy;
-
- interface
-
- uses
- SysUtils, Windows, Classes;
-
- procedure AAThreadedCopyStream(aSrcStream, aDestStream : TStream);
-
- implementation
-
- const
- BufferSize = 1024;
-
- type
- PBuffer = ^TBuffer;
- TBuffer = packed record
- bCount : longint;
- bBlock : array [0..pred(BufferSize)] of byte;
- end;
-
- PBufferArray = ^TBufferArray;
- TBufferArray = array [0..1023] of PBuffer;
-
- TQueuedBuffers = class
- private
- FBufCount : integer;
- FBuffers : PBufferArray;
- FHead : integer;
- FIsNotEmpty : THandle;
- FIsNotFull : THandle;
- FTail : integer;
- protected
- function qbGetHead : PBuffer;
- function qbGetTail : PBuffer;
- public
- constructor Create(aBufferCount : integer);
- destructor Destroy; override;
-
- procedure AdvanceHead;
- procedure AdvanceTail;
-
- property Head : PBuffer read qbGetHead;
- property Tail : PBuffer read qbGetTail;
-
- property IsNotEmpty : THandle read FIsNotEmpty;
- property IsNotFull : THandle read FIsNotFull;
- end;
-
- type
- TProducer = class(TThread)
- private
- FStream : TStream;
- FBuffers : TQueuedBuffers;
- protected
- procedure Execute; override;
- public
- constructor Create(aStream : TStream;
- aBuffers : TQueuedBuffers);
- destructor Destroy; override;
- end;
-
- type
- TConsumer = class(TThread)
- private
- FStream : TStream;
- FBuffers : TQueuedBuffers;
- protected
- procedure Execute; override;
- public
- constructor Create(aStream : TStream;
- aBuffers : TQueuedBuffers);
- destructor Destroy; override;
- end;
-
-
- {===TQueuedBuffers===================================================}
- constructor TQueuedBuffers.Create(aBufferCount : integer);
- var
- i : integer;
- begin
- inherited Create;
- {allocate the buffers}
- FBuffers := AllocMem(aBufferCount * sizeof(pointer));
- for i := 0 to pred(aBufferCount) do
- GetMem(FBuffers^[i], sizeof(TBuffer));
- FBufCount := aBufferCount;
- {create the semaphores}
- FIsNotFull := CreateSemaphore(nil, aBufferCount, aBufferCount, '');
- FIsNotEmpty := CreateSemaphore(nil, 0, aBufferCount, '');
- end;
- {--------}
- destructor TQueuedBuffers.Destroy;
- var
- i : integer;
- begin
- {destroy the semaphores}
- if (FIsNotFull <> 0) then
- CloseHandle(FIsNotFull);
- if (FIsNotEmpty <> 0) then
- CloseHandle(FIsNotEmpty);
- {free the buffers}
- if (FBuffers <> nil) then begin
- for i := 0 to pred(FBufCount) do
- if (FBuffers^[i] <> nil) then
- FreeMem(FBuffers^[i], sizeof(TBuffer));
- FreeMem(FBuffers, FBufCount * sizeof(pointer));
- end;
- inherited Destroy;
- end;
- {--------}
- procedure TQueuedBuffers.AdvanceHead;
- begin
- inc(FHead);
- if (FHead = FBufCount) then
- FHead := 0;
- end;
- {--------}
- procedure TQueuedBuffers.AdvanceTail;
- begin
- inc(FTail);
- if (FTail = FBufCount) then
- FTail := 0;
- end;
- {--------}
- function TQueuedBuffers.qbGetHead : PBuffer;
- begin
- Result := FBuffers^[FHead];
- end;
- {--------}
- function TQueuedBuffers.qbGetTail : PBuffer;
- begin
- Result := FBuffers^[FTail];
- end;
- {====================================================================}
-
-
- {===TProducer========================================================}
- constructor TProducer.Create(aStream : TStream;
- aBuffers : TQueuedBuffers);
- begin
- inherited Create(true);
- FStream := aStream;
- FBuffers := aBuffers;
- end;
- {--------}
- destructor TProducer.Destroy;
- begin
- inherited Destroy;
- end;
- {--------}
- procedure TProducer.Execute;
- var
- Tail : PBuffer;
- begin
- {do until the stream is exhausted...}
- repeat
- {get the 'queue is not full' semaphore}
- WaitForSingleObject(FBuffers.IsNotFull, INFINITE);
- {read a block from the stream into the tail buffer}
- Tail := FBuffers.Tail;
- Tail^.bCount := FStream.Read(Tail^.bBlock, BufferSize);
- {advance the tail pointer}
- FBuffers.AdvanceTail;
- {as we've written a new buffer, signal the 'queue is not empty'
- semaphore}
- ReleaseSemaphore(FBuffers.IsNotEmpty, 1, nil);
- until (Tail^.bCount = 0);
- end;
- {====================================================================}
-
-
- {===TConsumer========================================================}
- constructor TConsumer.Create(aStream : TStream;
- aBuffers : TQueuedBuffers);
- begin
- inherited Create(true);
- FStream := aStream;
- FBuffers := aBuffers;
- end;
- {--------}
- destructor TConsumer.Destroy;
- begin
- inherited Destroy;
- end;
- {--------}
- procedure TConsumer.Execute;
- var
- Head : PBuffer;
- begin
- {get the 'queue is not empty' semaphore}
- WaitForSingleObject(FBuffers.IsNotEmpty, INFINITE);
- {get the head buffer}
- Head := FBuffers.Head;
- {while the head buffer is not empty...}
- while (Head^.bCount <> 0) do begin
- {write a block from the head buffer into the stream}
- FStream.Write(Head^.bBlock, Head^.bCount);
- {advance the head pointer}
- FBuffers.AdvanceHead;
- {as we've read a buffer, signal the 'queue is not full' semaphore}
- ReleaseSemaphore(FBuffers.IsNotFull, 1, nil);
- {get the 'queue is not empty' semaphore}
- WaitForSingleObject(FBuffers.IsNotEmpty, INFINITE);
- {get the head buffer}
- Head := FBuffers.Head;
- end;
- end;
- {====================================================================}
-
-
- {===Interfaced routine===============================================}
- procedure AAThreadedcopyStream(aSrcStream, aDestStream : TStream);
- var
- Buffers : TQueuedBuffers;
- Producer : TProducer;
- Consumer : TConsumer;
- WaitArray : array [0..1] of THandle;
- begin
- Buffers := nil;
- Producer := nil;
- Consumer := nil;
- try
- {create the queued buffer object (20 buffers) and the two threads}
- Buffers := TQueuedBuffers.Create(20);
- Producer := TProducer.Create(aSrcStream, Buffers);
- Consumer := TConsumer.Create(aDestStream, Buffers);
- {save the thread handles so we can wait on them}
- WaitArray[0] := Producer.Handle;
- WaitArray[1] := Consumer.Handle;
- {start the threads up}
- Consumer.Resume;
- Producer.Resume;
- {wait for the threads to finish}
- WaitForMultipleObjects(2, @WaitArray, true, INFINITE);
- finally
- Producer.Free;
- Consumer.Free;
- Buffers.Free;
- end;
- end;
- {====================================================================}
-
- end.
-